package worker

import (
	"context"
	"gitee.com/zhengjianfeng/crontab/common"
	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/mvcc/mvccpb"
	"time"
)

// 任务管理器
type JobMgr struct {
	client  *clientv3.Client
	kv      clientv3.KV
	lease   clientv3.Lease
	watcher clientv3.Watcher
}

var (
	// 单例
	G_jobMgr *JobMgr
)

// 监听任务的变化

func (jobMgr *JobMgr) watchJobs() (err error) {

	var (
		getResp            *clientv3.GetResponse
		kvPair             *mvccpb.KeyValue
		job                *common.Job
		watchStartRevition int64
		watchChan          clientv3.WatchChan
		watchResp          clientv3.WatchResponse
		watchEvent         *clientv3.Event
		jobName            string
		jobEvent           *common.JobEvent
	)

	// 1、get一下/cron/jobs/目录下的所有任务，并且获取当前集群的revision
	if getResp, err = jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithPrefix()); err != nil {
		return
	}

	// 当前有哪些任务
	for _, kvPair = range getResp.Kvs {
		// 反序列化json得到Job
		if job, err = common.UnpackJob(kvPair.Value); err != nil {
			err = nil
			continue
		}

		jobEvent = common.BulidJobEvent(common.JOB_EVENT_SAVE, job)

		// 把这个job同步给scheduler（调度协程）
		G_scheduler.PushJobEvent(jobEvent)
	}

	// 2、从该revition向后监听变化时间
	go func() { // 监听协程

		// 从GET时刻的后续版本开始监听变化
		watchStartRevition = getResp.Header.Revision + 1

		// 启动监听/cron/jobs/目录的后续变化
		watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithRev(watchStartRevition), clientv3.WithPrefix())

		// 处理监听
		for watchResp = range watchChan {
			for _, watchEvent = range watchResp.Events {
				switch watchEvent.Type {
				case mvccpb.PUT: // 任务保存
					if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil {
						continue
					}

					// 构造一个更新Event事件
					jobEvent = common.BulidJobEvent(common.JOB_EVENT_SAVE, job)

				case mvccpb.DELETE: // 任务被删除了
					jobName = common.ExtractJobName(string(watchEvent.Kv.Key))

					job = &common.Job{Name: jobName}

					// 构造一个删除Event
					jobEvent = common.BulidJobEvent(common.JOB_EVENT_DELETE, job)
				}

				//推给scheduler（调度协程）
				G_scheduler.PushJobEvent(jobEvent)
			}
		}
	}()
	return
}

// 监听强杀任务通知
func (jobMgr *JobMgr) watchKiller() (err error) {

	var (
		job        *common.Job
		watchChan  clientv3.WatchChan
		watchResp  clientv3.WatchResponse
		watchEvent *clientv3.Event
		jobName    string
		jobEvent   *common.JobEvent
	)

	// 监听/cron/killer目录
	go func() { // 监听协程
		// 启动监听/cron/killer/目录的变化
		watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_KILLER_DIR, clientv3.WithPrefix())

		// 处理监听
		for watchResp = range watchChan {
			for _, watchEvent = range watchResp.Events {
				switch watchEvent.Type {
				case mvccpb.PUT: // 杀死任务事件
					jobName = common.ExtractKillerName(string(watchEvent.Kv.Key))
					job = &common.Job{Name: jobName}
					jobEvent = common.BulidJobEvent(common.JOB_EVENT_KILL, job)
					//事件推给scheduler（调度协程）
					G_scheduler.PushJobEvent(jobEvent)
				case mvccpb.DELETE: // killer标记过期,被自动删除

				}
			}
		}
	}()
	return
}

// 初始化管理器
func InitJobMgr() (err error) {

	var (
		config  clientv3.Config
		client  *clientv3.Client
		kv      clientv3.KV
		lease   clientv3.Lease
		watcher clientv3.Watcher
	)

	config = clientv3.Config{
		Endpoints:   G_config.EtcdEndPoints,                                     // 集群地址
		DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond, // 超时时间
	}

	// 建立连接
	if client, err = clientv3.New(config); err != nil {
		return
	}

	// 得到KV和Lease的API子集
	kv = clientv3.NewKV(client)
	lease = clientv3.NewLease(client)
	watcher = clientv3.NewWatcher(client)

	// 赋值单例
	G_jobMgr = &JobMgr{
		client:  client,
		kv:      kv,
		lease:   lease,
		watcher: watcher,
	}

	// 启动任务监听
	err = G_jobMgr.watchJobs()

	// 启动监听killer
	err = G_jobMgr.watchKiller()

	return
}

// 创建任务执行锁
func (jobMgr *JobMgr) CreateJobLock(jobName string) (jobLock *JobLock) {
	// 返回一把锁
	jobLock = InitJobLock(jobName, jobMgr.kv, jobMgr.lease)
	return
}
